rabbitmq 基于TTL和Dead Letter特性实现延迟队列

前言

首先,rabbitmq并没有直接支持延迟队列这样的特性,我们必须通过rabbitmq提供的TTL(Time-To-Live:用于设置消息和队列的有效时间)和 Dead Letter(死信队列DLX ,寓意是存放那些不可送达消息的队列)这两个特性来间接的实现延迟队列的效果;接下来,将详细介绍TTL和 Dead Letter特性,以及如何使用它们来实现延迟队列的效果。

TTL

rabbitmq支持设置队列和消息的有效期(即Time-To-Live)。

Per-Queue Message TTL(设置queue中所有消息的有效期):

  1. 通过queue.declare声明队列的时候,在args参数中设置x-message-ttl的值指定

    Map<String, Object> args = new HashMap<String, Object>();
    args.put("x-message-ttl", 60000); //单位:毫秒
    channel.queueDeclare("myqueue", false, false, false, args);
  2. 通过添加policy的方式指定

    image

    或通过命令行

    ./rabbitmqctl set_policy -p /test TTL-Policy "^ttl-" '{"message-ttl":60000}' --apply-to queues --priority 1

Per-Message TTL(设置单条消息的有效期):

byte[] messageBodyBytes = "Hello, world!".getBytes();
AMQP.BasicProperties properties = new AMQP.BasicProperties();
properties.setExpiration("60000"); //设置单个消息有效期为60秒
channel.basicPublish("my-exchange", "routing-key", properties, messageBodyBytes);

如果在单条消息上设置了有效期,又在队列中设置了queue scope域的消息有效期,则取两者之间的最小值作为消息实际的有效期。

同时,我们可以对已经存在消息的队列使用TTL策略,但这涉及一些注意事项。TTL策略会应用到这些已经存在的消息上,并且只有当过期的消息到达queue的head头部时,才会被丢弃或转发到下面所说的DLX中。消费者是不会收到过期的消息的,但是有一点必须记住,在信息过期和消费者投放之间可能存在竞争条件,例如消息可以在它被写入套接字之后但在它到达消费者之前过期等,同时过期的消息也会被计入到queue消息数量的统计中。

如果设置Per-Queue Message TTL不会有问题,因为快要过期的消息总是在队列的前边;但是如果使用Per-Message TTL的话,过期的消息有可能会在未过期的消息后边,直到前边的消息过期或者被消费。因为RabbitMQ保证过期的消息一定不会被消费者消费,但是不能保证消息过期就会从队列中移除。

Queue TTL,我们除了可以设置队列消息的TTL之外,还可以设置queue的TTL。

  1. 通过queue.declare声明队列的时候,在args参数中设置x-expires的值指定

    Map<String, Object> args = new HashMap<String, Object>();
    args.put("x-expires", 1800000);// 有效期30分钟
    channel.queueDeclare("myqueue", false, false, false, args);
  2. 通过添加policy的方式指定

    ./rabbitmqctl set_policy -p /test TTL-Queue-Policy "^ttl-queue-" '{"expires":1800000}' --apply-to queues --priority 1

    具体在管理界面的操作,请参考Per-Queue Message TTL设置。

意思是指在x-expires时间内,如果队列没有活跃(即没有被使用),服务器将会删除该队列。有点类似于java中线程池对线程管理的最大空闲时间。

应用场景:可以作为RPC远程过程调用的答复队列等。

Dead Letter

队列中的消息可以是“死信”(即不可送达的信息),一般在发生以下事件时会将这种消息重新发布到另一个exchange:

  1. 消费端调用basic.reject或basic.nack拒绝消息且requeue=false不重新放回队列;
  2. 队列中的消息过期;
  3. 队列中消息长度达到最大限制;

官方原文:

  1. rejected - the message was rejected with requeue=false;
  2. expired - the TTL of the message expired;
  3. maxlen - the maximum allowed queue length was exceeded;

Dead Letter exchanges(DLXs)就是一种普通的exchange,可以是fanout,direct,topic,headers中的任意一种类型。我们在声明一个队列的时候,可以为这个队列指定一个Dead Letter exchanges(DLXs),当这个队列中的消息被拒绝且requeue=false,过期,或达到队列最大长度时,这些消息将被转发到此队列配置的DLXs。

具体配置如下:

  1. 通过queue.declare声明队列的时候,在args参数中设置x-dead-letter-exchange的值指定

    // 声明一个普通direct模式的exchange作为DLXs
    channel.exchangeDeclare("some.exchange.name", "direct");


    Map<String, Object> args = new HashMap<String, Object>();
    // 将上述声明的exchange作为此队列的Dead Letter exchanges(DLXs)
    args.put("x-dead-letter-exchange", "some.exchange.name");
    // 重新设置队列中所产生dead-letter message的route-key
    args.put("x-dead-letter-routing-key", "some-routing-key");
    channel.queueDeclare("myqueue", false, false, false, args);
  2. 通过添加policy的方式指定

    ./rabbitmqctl set_policy -p /test DLX-Queue-Policy "^dlx-queue-" '{"dead-letter-exchange":"some.exchange.name", 
    "dead-letter-routing-key":"some-routing-key"}' --apply-to queues --priority 1

注1:当队列queue.declare声明时,Dead Letter exchanges(DLXs)并不是必须要声明的,但是如果有dead-lettered message产生的时候,则DLXs必须要存在,如果不存在的话,就会出现dead-lettered message被直接丢弃。在转发dead-lettered message时,我们可以通过在声明queue时指定x-dead-letter-routing-key属性值为其重新设置route-key,如果没有设置的话,则使用message自己的route-key(即最初publish时指定的route-key)。

注2:Dead-lettered messages被重新publish时,是开启了publisher confirms模式的;也就是说,只有当dead-letter queues(DLX routing targets)确认收到了Dead-lettered messages之后,才会通知原队列删除该message,以确保消息不会丢失。

延迟队列

实现原理图:
image

下面是使用rabbitmq TTL和Dead Letter特性实现的延迟队列的示例代码:

# producer client: 
// 声明一个普通direct模式的exchange作为DLXs
channel.exchangeDeclare("dlxs-exchange", "direct");

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-dead-letter-exchange", "dlxs-exchange");
args.put("x-dead-letter-routing-key", "main-queue");
channel.queueDeclare("delay-queue", false, false, false, args);

// define and send delay-message(TTL = 60 seconds)
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
AMQP.BasicProperties properties = builder.expiration("60000").deliveryMode(2).build();
channel.basicPublish("", "delay-queue", properties, "nop".getBytes());

# consumer client:
// 声明消费者
boolean autoAck = false;
channel.basicConsume("main-queue", autoAck, "myConsumerTag",
new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException
{
String routingKey = envelope.getRoutingKey();
String contentType = properties.getContentType();
long deliveryTag = envelope.getDeliveryTag();
channel.basicAck(deliveryTag, false); // 消费端确认成功处理消息
}
});

应用场景

  1. 用户注册成功后,1分钟之后给用户发送欢迎邮件;
  2. 2分钟后再次尝试回调;
  3. ……

参考链接

  1. https://www.rabbitmq.com/ttl.html
  2. https://www.rabbitmq.com/dlx.html
  3. http://www.rabbitmq.com/ttl.html#per-queue-message-ttl
  4. https://www.rabbitmq.com/parameters.html#policies